Spark Streaming之window(窗口操作) |
您所在的位置:网站首页 › structured streaming window › Spark Streaming之window(窗口操作) |
Spark Streaming 还提供了窗口的计算,它允许通过滑动窗口对数据进行转换,窗口转换操作如下图 所示: 对于窗口操作而言,在其窗口内部会有 N 个批处理数据,批处理数据的大小由窗口间隔决定,而窗口间隔指的就是窗口的持续时间。 在窗口操作中,只有窗口的长度满足了才会触发批数据的处理。除了窗口的长度,窗口操作还有另一个重要的参数,即滑动间隔,它指的是经过多长时间窗口滑动一次形成新的窗口。 滑动间隔默认情况下和批次间隔相同,而窗口间隔一般设置得要比它们两个大。在这里必须注意的一点是,滑动间隔和窗口间隔的大小一定得设置为批处理间隔的整数倍。 如下图所示,批处理间隔是 1 个时间单位,窗口间隔是 3 个时间单位,滑动间隔是 2 个时间单位。对于初始的窗口(time 1~time 3),只有窗口间隔满足了才会触发数据的处理。 注意: 有可能初始的窗口没有被流入的数据撑满,但是随着时间的推进/窗口最终会被撑满。每过 2 个时间单位,窗口滑动一次,这时会有新的数据流入窗口,窗口则移去最早的 2 个时间单位的数据,而与最新的 2 个时间单位的数据进行汇总形成新的窗口(time 3~ time 5)。
该操作由一个DStream对象调用,传入一个窗口长度参数,一个窗口移动速率参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStream。 示例: 以长度为3,移动速率为1截取源DStream中的元素形成新的DStream。 val conf = new SparkConf().setMaster("local[2]").setAppName("TestCount") val ssc = new StreamingContext(conf,Seconds(1)) val lines = ssc.socketTextStream("localhost",9999) val words = lines.flatMap (_.split(" ")) val windowCounts = words.window(Second(3),Second(1)) windowCounts.print() //通过start()启动消息采集和处理 ssc.start() //启动完成后就不能再做其它操作 //等待计算完成 ssc.awaitTermination()基本上每秒输入一个字母,然后取出当前时刻3秒这个长度中的所有元素,打印出来。从上面的截图中可以看到,下一秒时已经看不到a了,再下一秒,已经看不到b和c了。表示a, b, c已经不在当前的窗口中。 2. countByWindow(windowLength,slideInterval)返回指定长度窗口中的元素个数 示例: 统计当前3秒长度的时间窗口的DStream中元素的个数: val conf = new SparkConf().setMaster("local[2]").setAppName("TestCount") val ssc = new StreamingContext(conf,Seconds(1)) val lines = ssc.socketTextStream |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |